package window;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.Test;

import java.time.Duration;

public class Flink_WaterMark_Window {
    //TODO 窗口+水位线
    @Test
    public void test() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> input = env.socketTextStream("node193", 9999).map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] split = value.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        });

        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> element.getTs() * 1000);

        input.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy)
                .keyBy(WaterSensor::getId)
//                .window(EventTimeSessionWindows.withGap(Time.seconds(5)))//会话窗口
//                .window(TumblingEventTimeWindows.of(Time.seconds(5)))//滚动窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5),Time.seconds(3)))//划动窗口
                .reduce((ReduceFunction<WaterSensor>) (value1, value2) -> value2)
                .print();
        env.execute();
    }
}
